feat(records): stream-type-aware concurrency limits#2688
Draft
andersfylling wants to merge 16 commits into
Draft
feat(records): stream-type-aware concurrency limits#2688andersfylling wants to merge 16 commits into
andersfylling wants to merge 16 commits into
Conversation
Model the Records API's hierarchical rate-limit budgets for mutable and immutable streams. Write semaphore (20) is shared across both stream types. Query, retrieve, and aggregate each have separate mutable and immutable semaphores matching the API's documented limits. Retrieve and aggregate have dedicated budgets checked before the shared query budget (both must pass). The semaphore helpers on RecordsAPI are wired up for all three tiers, ready for use by endpoint implementations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9360953 to
889fed8
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Single method handles all operation types (write, delete, query, retrieve, aggregate) with stream_type routing for read operations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rcement Retrieve and aggregate must pass both their dedicated budget AND the shared query budget. HierarchicalBoundedSemaphore acquires multiple semaphores in order (dedicated first, query second) and releases in reverse. _get_semaphore returns this composite for retrieve/aggregate operations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2688 +/- ##
==========================================
+ Coverage 93.65% 93.74% +0.08%
==========================================
Files 498 498
Lines 50391 50941 +550
==========================================
+ Hits 47196 47754 +558
+ Misses 3195 3187 -8
🚀 New features to boost your workflow:
|
…maphore routing - HierarchicalBoundedSemaphore: acquires both, releases both, releases on exception, limits concurrency to the min of both semaphores - _get_semaphore: returns plain semaphore for write/delete/query, HierarchicalBoundedSemaphore for retrieve/aggregate, wraps the correct dedicated+query pair, and shares the query semaphore Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…eption-safe Two bugs found by adversarial testing: 1. Cancellation during __aenter__ (e.g. via asyncio.wait_for timeout) leaked already-acquired semaphores — now rolls back on any BaseException during acquisition. 2. Exception in one semaphore's __aexit__ skipped releasing the rest — now continues releasing all semaphores and re-raises the first error. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Raises ValueError at config time (constructor and setters) if a dedicated budget (retrieve/aggregate) exceeds its corresponding shared query budget, since the hierarchical semaphore would never use the extra slots. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each setter validates only its own dedicated-vs-shared relationship instead of re-checking all four pairs. Shared setters (query_*) validate against existing dedicated values; dedicated setters validate against the current shared value. Constructor uses the property setters directly, setting shared budgets first so dedicated setters can validate. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each setter passes its own name=value as a kwarg override. The method resolves each budget from overrides or self, then checks all four dedicated-vs-shared pairs. Init assigns directly and calls it once. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Setters are now just _validate_budgets(name=value) + assign. The validation method handles both the frozen check (from the override keys) and the budget invariant in one call. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- ruff format: wrap long ternary expressions - mypy: add type: ignore[override] on _get_semaphore (wider signature than base class, intentional) and type: ignore[arg-type] on test-only BoomSemaphore fakes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…client Three tests that push a HierarchicalBoundedSemaphore through the actual SDK _post → _http_client._with_retry → async with chain: - successful request acquires and releases both semaphores - HTTP 500 error still releases both semaphores - concurrent requests are limited by the tighter semaphore Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Seven tests exercising the exact _get_semaphore → _post chain that list/retrieve/aggregate endpoints will use, for both mutable and immutable streams. Verifies correct semaphore types, values, release after request, and that retrieve shares the query semaphore with list. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
The Records API enforces hierarchical rate limits that differ between mutable and immutable streams. The SDK needs to model these so that concurrent requests stay within budget without relying solely on server-side 429 retries.
Key constraints from the API:
query_{type}retrieve_{type}→query_{type}aggregate_{type}→query_{type}writeWhat
RecordsGlobalConcurrencyConfigwith separate semaphores per budget tier and stream type (7 total: write, query/retrieve/aggregate × mutable/immutable).HierarchicalBoundedSemaphorethat acquires multiple semaphores in order for retrieve/aggregate, with cancellation-safe rollback and exception-safe release.RecordsAPI._get_semaphore(operation, stream_type)routes to the correct semaphore(s).🤖 Generated with Claude Code